package com.uxsino.commons.threads;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.*;

/**
 * @ClassName GlobalThreadPool
 * @Description TODO
 * @Author <a href="mailto:royrxc@gmail.com">Ran</a>
 * @Daate 2020/3/20 17:52
 **/
public final class GlobalThreadPool {
    private static final ThreadPoolExecutor executor;
    private static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 5;
    private static final int MAX_POOL_SIZE = CORE_POOL_SIZE * 10;
    private static final int IDLE_TIMEOUT_MS = 2 * 60 * 1000;
    private static final int QUEUE_SIZE = MAX_POOL_SIZE * 100;
    private static final Logger LOG = LoggerFactory.getLogger(GlobalThreadPool.class);
    private static final ConcurrentHashMap<String, ThreadPoolExecutor> SINGLE_POOL = new ConcurrentHashMap<>();
    static {
        executor = create("GLOBAL_THREAD_POOL", CORE_POOL_SIZE, MAX_POOL_SIZE, IDLE_TIMEOUT_MS, QUEUE_SIZE, RejectedType.CALL);
        LOG.info("global thread pool init : core_pool_size {}, max_pool_size: {}", CORE_POOL_SIZE, MAX_POOL_SIZE);
    }

    public static Future<?> submit(Runnable run){
        view();
        return executor.submit(run);
    }

    public static <T> Future<T> submit(Callable<T> run){
        view();
        return executor.submit(run);
    }

    public static <T> Future<T> submit(Runnable run, T result){
        view();
        return executor.submit(run, result);
    }

    public static void view(){
        if(executor.getQueue().size() > 100){
            LOG.warn("global queue size reached: {}, and active: {}", executor.getQueue().size(), executor.getActiveCount());
        }
    }

    public static Map<String, Object> state(){
        Map<String, Object> s = Maps.newHashMap();
        s.put("queueSize", executor.getQueue().size());
        s.put("activeCount", executor.getActiveCount());
        s.put("completeCount", executor.getCompletedTaskCount());
        return s;
    }

    public static void destroy(String poolName){
        synchronized (SINGLE_POOL){
            ThreadPoolExecutor pool = SINGLE_POOL.remove(poolName);
            if(pool != null){
                pool.shutdown();
            }
        }
    }

    /**
     * single pool
     * @param poolName
     * @return
     */
    public static ThreadPoolExecutor getPool(String poolName){
        synchronized (SINGLE_POOL){
            return getPool(poolName, 1, Integer.MAX_VALUE, IDLE_TIMEOUT_MS);
        }
    }

    /**
     *
     * @param poolName
     * @param pooSize
     * @param maxPoolSize
     * @param timeoutMs if o then 6000 ms
     * @param queueSize
     * @return
     */
    public static ThreadPoolExecutor getPool(String poolName, int pooSize, int maxPoolSize, int timeoutMs, int queueSize){
        synchronized (SINGLE_POOL){
            ThreadPoolExecutor pool = SINGLE_POOL.get(poolName);
            if(pool == null){
                pool = create(poolName, pooSize, maxPoolSize, timeoutMs, queueSize, RejectedType.CALL);
                SINGLE_POOL.put(poolName, pool);
            }
            return pool;
        }
    }
    /**
     * @param poolName
     * @param pooSize
     * @param maxPoolSize
     * @param timeout if o then 6000 ms
     * @return
     */
    public static ThreadPoolExecutor getPool(String poolName, int pooSize, int maxPoolSize, int timeout ){
        synchronized (SINGLE_POOL){
            return getPool(poolName, pooSize, maxPoolSize, timeout, Integer.MAX_VALUE == maxPoolSize ? Integer.MAX_VALUE : maxPoolSize * 5);
        }
    }

    /**
     * 当线程数已经达到maxPoolSize，且队列已满，的策略
     */
    public enum RejectedType{
        /**
         * 丢弃任务，抛运行时异常
         */
        ABORT,
        /**
         * 执行任务
         */
        CALL,
        /**
         * 忽视，什么都不会发生
         */
        DISCARD,
        /**
         * 从队列中踢出最先进入队列（最后一个执行）的任务
         */
        DISCARDOLD;
        public RejectedExecutionHandler createHandler(){
            switch (this){
                case CALL:
                    return new ThreadPoolExecutor.CallerRunsPolicy();
                case ABORT:
                    return new ThreadPoolExecutor.AbortPolicy();
                case DISCARD:
                    return new ThreadPoolExecutor.DiscardPolicy();
                case DISCARDOLD:
                    return new ThreadPoolExecutor.DiscardOldestPolicy();
                default:
                    return new ThreadPoolExecutor.DiscardPolicy();
            }

        }

    }

    /**
     *
     * @param poolName
     * @param pooSize
     * @param maxPoolSize
     * @param timeoutMs if 0 then 6000 ms
     * @param queueSize
     * @return
     */
    public static ThreadPoolExecutor create(String poolName, int pooSize, int maxPoolSize, int timeoutMs, int queueSize){
        return create(poolName, pooSize, maxPoolSize, timeoutMs, queueSize, RejectedType.CALL);
    }

    /**
     *
     * @param poolName
     * @param pooSize
     * @param maxPoolSize
     * @param timeoutMs if 0 then 6000 ms
     * @param queueSize
     * @param type
     * @return
     */
    public static ThreadPoolExecutor create(String poolName, int pooSize, int maxPoolSize, int timeoutMs, int queueSize, RejectedType type){
        return create(poolName, pooSize, maxPoolSize, timeoutMs, queueSize, type==null?RejectedType.CALL.createHandler():type.createHandler());
    }

    /**
     *
     * @param poolName
     * @param pooSize
     * @param maxPoolSize
     * @param timeoutMs if 0 then 6000 ms
     * @param queueSize
     * @param handler
     * @return
     */
    public static ThreadPoolExecutor create(String poolName, int pooSize, int maxPoolSize, int timeoutMs, int queueSize, RejectedExecutionHandler handler){
        pooSize = Math.max(1, pooSize);
        maxPoolSize = Math.max(pooSize, maxPoolSize);
        timeoutMs = Math.max(1000, timeoutMs);
        queueSize = Math.max(pooSize ,queueSize);
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(poolName).build();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(pooSize, Math.max(pooSize, maxPoolSize), timeoutMs>0?timeoutMs:IDLE_TIMEOUT_MS, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(queueSize), namedThreadFactory);
        pool.allowCoreThreadTimeOut(true);
        pool.setRejectedExecutionHandler(handler==null?RejectedType.CALL.createHandler():handler);

        return pool;
    }
}
